package com.flow.framework.core.system.thread.pool.executor;

import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.core.system.checker.ThreadPoolStatusChecker;
import com.flow.framework.core.system.thread.pool.policy.RejectedPolicy;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 自定义拒绝策略处理器持有者
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/1/23
 */
@Slf4j
class CustomizationRejectedHandlerHolder {

    static CustomizationRejectedHandler getCustomizationHandler(RejectedPolicy rejectedPolicy) {
        switch (rejectedPolicy) {
            case DISCARD_POLICY: {
                // 直接丢弃
                return CustomizationRejectedHandlerHolder.DiscardPolicy.INSTANCE;
            }
            case DISCARD_OLDEST_POLICY: {
                // 丢弃最老的
                return CustomizationRejectedHandlerHolder.DiscardOldestPolicy.INSTANCE;
            }
            case THROW_EXCEPTION_POLICY: {
                // 直接抛出异常
                return CustomizationRejectedHandlerHolder.ThrowExceptionPolicy.INSTANCE;
            }
            case BLOCK_QUEUE_POLICY: {
                // 阻塞直到提交成功
                return CustomizationRejectedHandlerHolder.BlockQueuePolicy.INSTANCE;
            }
            case CALLER_RUN_POLICY: {
                // 调用者线程自己执行提交的任务
                return CustomizationRejectedHandlerHolder.CallerRunPolicy.INSTANCE;
            }
            default:
                break;
        }
        throw new CheckedException(SystemErrorCode.OBJECT_NOT_FOUND_ERROR);
    }

    /**
     * 丢弃最老的任务
     */
    static class DiscardOldestPolicy implements CustomizationRejectedHandler {

        private static final DiscardOldestPolicy INSTANCE = new DiscardOldestPolicy();

        private DiscardOldestPolicy() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e, ThreadPoolStatusChecker threadPoolStatusListener,
                                      CustomizationRejectedHandler customizationRejectedHandler) {
            if (null != customizationRejectedHandler) {
                customizationRejectedHandler.rejectedExecution(r, e, threadPoolStatusListener, null);
                return;
            }
            if (!e.isShutdown()) {
                Runnable removeRunnable = e.getQueue().poll();
                if (null != removeRunnable) {
                    log.debug("old task discard.");
                    if (removeRunnable instanceof RunnableFuture) {
                        try {
                            ((RunnableFuture) r).cancel(true);
                        } catch (Exception exception) {
                            log.debug("cancel task error.", exception);
                        }
                    } else {
                        log.error("old task error, old runnable type un-support.");
                        throw new CheckedException(SystemErrorCode.SUBMIT_TASK_ERROR);
                    }
                }

                // 此时会递归调用DiscardOldestPolicy拒绝策略，直到提交成功
                e.execute(r);
            }
            log.error("submit task error, thread pool is shutdown.");
            throw new CheckedException(SystemErrorCode.SUBMIT_TASK_ERROR);
        }
    }

    /**
     * 丢弃当前提交的任务
     */
    static class DiscardPolicy implements CustomizationRejectedHandler {

        private static final DiscardPolicy INSTANCE = new DiscardPolicy();

        private DiscardPolicy() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e, ThreadPoolStatusChecker threadPoolStatusListener,
                                      CustomizationRejectedHandler customizationRejectedHandler) {
            if (null != customizationRejectedHandler) {
                customizationRejectedHandler.rejectedExecution(r, e, threadPoolStatusListener, null);
                return;
            }
            if (r instanceof RunnableFuture) {
                try {
                    log.debug("submit task error, task discard.");
                    ((RunnableFuture) r).cancel(true);
                } catch (Exception exception) {
                    log.debug("cancel task error.", exception);
                }
            } else {
                log.error("submit task error, runnable type un-support, task discard.");
                throw new CheckedException(SystemErrorCode.SUBMIT_TASK_ERROR);
            }
        }
    }

    /**
     * 抛出异常
     */
    static class ThrowExceptionPolicy implements CustomizationRejectedHandler {

        private static final ThrowExceptionPolicy INSTANCE = new ThrowExceptionPolicy();

        private ThrowExceptionPolicy() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e, ThreadPoolStatusChecker threadPoolStatusListener,
                                      CustomizationRejectedHandler customizationRejectedHandler) {
            if (null != customizationRejectedHandler) {
                customizationRejectedHandler.rejectedExecution(r, e, threadPoolStatusListener, null);
                return;
            }
            log.error("submit task error, reject policy is throw exception.");
            throw new CheckedException(SystemErrorCode.SUBMIT_TASK_ERROR);
        }
    }

    /**
     * 阻塞直到提交成功
     */
    static class BlockQueuePolicy implements CustomizationRejectedHandler {

        private static final BlockQueuePolicy INSTANCE = new BlockQueuePolicy();

        private BlockQueuePolicy() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e, ThreadPoolStatusChecker threadPoolStatusListener,
                                      CustomizationRejectedHandler customizationRejectedHandler) {
            if (null != customizationRejectedHandler) {
                customizationRejectedHandler.rejectedExecution(r, e, threadPoolStatusListener, null);
                return;
            }
            try {
                e.getQueue().put(r);
            } catch (Exception exception) {
                log.error("submit task error.", exception);
                throw new CheckedException(SystemErrorCode.SUBMIT_TASK_ERROR, "submit task error.", exception);
            }
        }
    }

    /**
     * 调用者线程自己执行提交的任务
     */
    static class CallerRunPolicy implements CustomizationRejectedHandler {

        private static final CallerRunPolicy INSTANCE = new CallerRunPolicy();

        private CallerRunPolicy() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e, ThreadPoolStatusChecker threadPoolStatusListener,
                                      CustomizationRejectedHandler customizationRejectedHandler) {
            if (null != customizationRejectedHandler) {
                customizationRejectedHandler.rejectedExecution(r, e, threadPoolStatusListener, null);
                return;
            }
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
}